In [ ]:
from elasticsearch import Elasticsearch
es = Elasticsearch()
In [ ]:
create_index = {
"settings": {
"analysis": {
"analyzer": {
"payload_analyzer": {
"type": "custom",
"tokenizer":"whitespace",
"filter":"delimited_payload_filter"
}
}
}
},
"mappings": {
"ratings": {
"properties": {
"timestamp": {
"type": "date"
},
"userId": {
"type": "string",
"index": "not_analyzed"
},
"movieId": {
"type": "string",
"index": "not_analyzed"
},
"rating": {
"type": "double"
}
}
},
"users": {
"properties": {
"name": {
"type": "string"
},
"@model": {
"properties": {
"factor": {
"type": "string",
"term_vector": "with_positions_offsets_payloads",
"analyzer" : "payload_analyzer"
},
"version": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
},
"movies": {
"properties": {
"genres": {
"type": "string"
},
"original_language": {
"type": "string",
"index": "not_analyzed"
},
"image_url": {
"type": "string",
"index": "not_analyzed"
},
"release_date": {
"type": "date"
},
"popularity": {
"type": "double"
},
"@model": {
"properties": {
"factor": {
"type": "string",
"term_vector": "with_positions_offsets_payloads",
"analyzer" : "payload_analyzer"
},
"version": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
}
}
# create index with the settings & mappings above
es.indices.create(index="demo", body=create_index)
In [ ]:
user_df = sqlContext.read.format("es").load("demo/users")
user_df.printSchema()
user_df.select("userId", "name").show(5)
In [ ]:
movie_df = sqlContext.read.format("es").load("demo/movies")
movie_df.printSchema()
movie_df.select("movieId", "title", "genres", "release_date", "popularity").show(5)
In [ ]:
ratings_df = sqlContext.read.format("es").load("demo/ratings")
ratings_df.printSchema()
ratings_df.show(5)
In [ ]:
from pyspark.ml.recommendation import ALS
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", regParam=0.1, rank=20)
model = als.fit(ratings_df)
model.userFactors.show(5)
model.itemFactors.show(5)
In [ ]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, lit
def convert_vector(x):
'''Convert a list or numpy array to delimited token filter format'''
return " ".join(["%s|%s" % (i, v) for i, v in enumerate(x)])
def reverse_convert(s):
'''Convert a delimited token filter format string back to list format'''
return [float(f.split("|")[1]) for f in s.split(" ")]
def vector_to_struct(x, version):
'''Convert a vector to a SparkSQL Struct with string-format vector and version fields'''
return (convert_vector(x), version)
vector_struct = udf(vector_to_struct, \
StructType([StructField("factor", StringType(), True), \
StructField("version", StringType(), True)]))
In [ ]:
# test out the vector conversion function
test_vec = model.userFactors.select("features").first().features
print test_vec
print
print convert_vector(test_vec)
In [ ]:
ver = model.uid
movie_vectors = model.itemFactors.select("id", vector_struct("features", lit(ver)).alias("@model"))
movie_vectors.select("id", "@model.factor", "@model.version").show(5)
user_vectors = model.userFactors.select("id", vector_struct("features", lit(ver)).alias("@model"))
user_vectors.select("id", "@model.factor", "@model.version").show(5)
In [ ]:
# write data to ES, use:
# - "id" as the column to map to ES movie id
# - "update" write mode for ES
# - "append" write mode for Spark
movie_vectors.write.format("es") \
.option("es.mapping.id", "id") \
.option("es.write.operation", "update") \
.save("demo/movies", mode="append")
In [ ]:
user_vectors.write.format("es") \
.option("es.mapping.id", "id") \
.option("es.write.operation", "update") \
.save("demo/users", mode="append")
In [ ]:
es.search(index="demo", doc_type="movies", q="star wars force", size=1)
In [ ]:
from IPython.display import Image, HTML, display
def fn_query(query_vec, q="*", cosine=False):
return {
"query": {
"function_score": {
"query" : {
"query_string": {
"query": q
}
},
"script_score": {
"script": "payload_vector_score",
"lang": "native",
"params": {
"field": "@model.factor",
"vector": query_vec,
"cosine" : cosine
}
},
"boost_mode": "replace"
}
}
}
def get_similar(the_id, q="*", num=10, index="demo", dt="movies"):
response = es.get(index=index, doc_type=dt, id=the_id)
src = response['_source']
if '@model' in src and 'factor' in src['@model']:
raw_vec = src['@model']['factor']
# our script actually uses the list form for the query vector and handles conversion internally
query_vec = reverse_convert(raw_vec)
q = fn_query(query_vec, q=q, cosine=True)
results = es.search(index, dt, body=q)
hits = results['hits']['hits']
return src, hits[1:num+1]
def display_similar(the_id, q="*", num=10, index="demo", dt="movies"):
movie, recs = get_similar(the_id, q, num, index, dt)
# display query
q_im_url = movie['image_url']
display(HTML("<h2>Get similar movies for:</h2>"))
display(Image(q_im_url, width=200))
display(HTML("<br>"))
display(HTML("<h2>Similar movies:</h2>"))
sim_html = "<table border=0><tr>"
i = 0
for rec in recs:
r_im_url = rec['_source']['image_url']
r_score = rec['_score']
sim_html += "<td><img src=%s width=200></img></td><td>%2.3f</td>" % (r_im_url, r_score)
i += 1
if i % 5 == 0:
sim_html += "</tr><tr>"
sim_html += "</tr></table>"
display(HTML(sim_html))
In [ ]:
display_similar(122886, num=5)
In [ ]:
display_similar(122886, num=5, q="title:(NOT trek)")
In [ ]:
display_similar(6377, num=5, q="genres:children AND release_date:[now-2y/y TO now]")